package com.isyscore.os.metadata.service.impl;

import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.isyscore.boot.login.LoginUserManager;
import com.isyscore.os.core.exception.DataFactoryException;
import com.isyscore.os.core.exception.ErrorCode;
import com.isyscore.os.core.model.entity.DataFlowDefinition;
import com.isyscore.os.core.model.entity.DataFlowTask;
import com.isyscore.os.core.util.InitiallyUtils;
import com.isyscore.os.metadata.dao.DataFlowDefinitionMapper;
import com.isyscore.os.metadata.database.AbstractDatabase;
import com.isyscore.os.metadata.enums.DataSourceTypeEnum;
import com.isyscore.os.metadata.enums.KettleDataFlowNodeType;
import com.isyscore.os.metadata.kettle.base.FlowConfig;
import com.isyscore.os.metadata.kettle.base.FlowMetaCodec;
import com.isyscore.os.metadata.kettle.dto.*;
import com.isyscore.os.metadata.kettle.vis.*;
import com.isyscore.os.metadata.manager.DatabaseManager;
import com.isyscore.os.metadata.manager.QuartzJobManager;
import com.isyscore.os.metadata.model.dto.DataSourceDTO;
import com.isyscore.os.metadata.model.dto.req.FlowDTO;
import com.isyscore.os.metadata.model.dto.req.FlowDefinitionModifyDTO;
import com.isyscore.os.metadata.model.dto.req.FlowScheduleModifyDTO;
import com.isyscore.os.metadata.model.vo.ResultVO;
import com.isyscore.os.metadata.service.DataFlowDefinitionService;
import com.isyscore.os.metadata.service.DataFlowTaskService;
import com.isyscore.os.metadata.service.DataSourceService;
import com.isyscore.os.metadata.service.ETLFlowNodeService;
import com.isyscore.os.metadata.service.quartz.DataFlowJobTask;
import com.isyscore.os.metadata.utils.SnowflakeUtil;
import com.isyscore.os.metadata.utils.SqlUtil;
import com.isyscore.os.metadata.utils.TimeUtil;
import com.isyscore.os.metadata.utils.UdmpBeanUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.pentaho.di.core.CheckResultInterface;
import org.pentaho.di.core.row.RowMetaInterface;
import org.pentaho.di.core.row.ValueMetaInterface;
import org.pentaho.di.trans.TransMeta;
import org.pentaho.di.trans.step.StepMeta;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.time.LocalDateTime;
import java.util.*;
import java.util.stream.Collectors;

import static com.isyscore.os.core.exception.ErrorCode.*;
import static com.isyscore.os.metadata.constant.CommonConstant.DATA_FLOW_JOB_GROUP;

@Service
@Slf4j
public class DataFlowDefinitionServiceImpl extends ServiceImpl<DataFlowDefinitionMapper, DataFlowDefinition> implements DataFlowDefinitionService {
    public static final Integer ENABLE_SCHEDULE = 1;
    public static final Integer DISABLE_SCHEDULE = 0;
    public static final Integer ENABLE_REPEAT_RUN = 1;
    public static final Integer DISABLE_REPEAT_RUN = 0;
    public static final Integer PUBLISH_STATUS_SAVED = 0;
    public static final Integer PUBLISH_STATUS_PUBLISHED = 1;
    public static final String ENABLE_INCRE_RUN = "YES";
    public static final String DISABLE_INCRE_RUN = "NO";

    @Autowired
    SnowflakeUtil snowflakeUtil;
    @Autowired
    FlowMetaCodec codec;
    @Autowired
    FlowParser flowParser;
    @Autowired
    private DataSourceService dataSourceService;
    @Autowired
    private QuartzJobManager dataFlowJobManager;
    @Autowired
    private DataFlowTaskService dataFlowTaskService;
    @Autowired
    private DataFlowDefinitionService dataFlowDefinitionService;
    @Autowired
    private LoginUserManager loginUserManager;
    @Autowired
    private DatabaseManager databaseManager;
    @Autowired
    private DataFlowDefinitionMapper definitionMapper;
    @Autowired
    private ETLFlowNodeService etlFlowNodeService;

    @Override
    @Transactional(rollbackFor = Exception.class)
    public String publishFlowDefinition(FlowDefinitionModifyDTO definition) {
        Map<String,String> param = new HashMap();
        param.put("logSize",definition.getLogSize()==null?"200":definition.getLogSize().toString());
        //是否开启任务重试
        param.put("retryRun",String.valueOf(definition.getRetryRun()==null?0:definition.getRetryRun()));
        param.put("definitionId",String.valueOf(definition.getDefinitionId()));
        definition.getUserConfig().setOtherParam(param);

        DataFlowDefinition dataFlowDefinition = new DataFlowDefinition();
        //1.保存/更新
        String definitionId = createOrUpdateFlowDefinition(definition);
        //2.校验
        List<FlowNodeValidatedResultDTO> flowNodeValidatedResultDTOS = etlFlowNodeService.validateGraph(definition.getUserConfig());
        boolean hasErrer = false;
        for (FlowNodeValidatedResultDTO flowNodeValidatedResultDTO : flowNodeValidatedResultDTOS) {
            if (!flowNodeValidatedResultDTO.getPassCheck()) {
                hasErrer = true;
                break;
            }
        }
        if (hasErrer) {
            throw new DataFactoryException(KETTLE_ETL_CHECK_FAIL);
        }
        //3.发布
        try {
            FlowConfig flowConfig = flowParser.parseFlow(definition.getUserConfig(), definition.getFlowName(), true);

            //是否为增量任务，并提取增量信息
            dataFlowDefinition.setIsIncre(DISABLE_INCRE_RUN);
            Map<String, Object> increNode = getIncreNode(definition);
            if (increNode != null) {
                if (ENABLE_INCRE_RUN.equals(String.valueOf(increNode.get("isIncre")))) {
                    dataFlowDefinition.setIsIncre(String.valueOf(increNode.get("isIncre")));
                    dataFlowDefinition.setIncreField(String.valueOf(increNode.get("increField")));
                    dataFlowDefinition.setIncreFieldType(String.valueOf(increNode.get("increFieldType")));
                    dataFlowDefinition.setIncrTable(String.valueOf(increNode.get("incrTable")));
                }
            }

            //开启重复运行需要检查动态参数
            if (definition.getRepeatRun() && checkHaveParam(flowConfig)) {
                throw new DataFactoryException(KETTLE_ETL_SCHEDULE_PROHIBIT_PARAM);
            }
            //重复运行必须设置开始时间和间隔
            if (definition.getRepeatRun()) {
                if (ObjectUtil.isNull(definition.getScheduleStartTime()) || ObjectUtil.isNull(definition.getScheduleInterval()) || StrUtil.isEmpty(definition.getScheduleIntervalTimeUnit())) {
                    throw new DataFactoryException(REPEAT_RUN_PARAM_ERROR);
                }
            }
            String flowXml = codec.decode(flowConfig, true).getXML();
            dataFlowDefinition.setKettleDefinition(flowXml);
            dataFlowDefinition.setPublishStatus(PUBLISH_STATUS_PUBLISHED);
            dataFlowDefinition.setId(Long.parseLong(definitionId));
            dataFlowDefinition.setScheduled(ENABLE_SCHEDULE);

            this.updateById(dataFlowDefinition);

            //调度任务刷新
            if (ENABLE_REPEAT_RUN== (definition.getRepeatRun()==true?1:0) && ObjectUtil.isNotNull(definition.getScheduleStartTime()) && StrUtil.isNotBlank(definition.getScheduleIntervalTimeUnit()) && ObjectUtil.isNotNull(definition.getScheduleInterval())) {
                refreshDataJob(dataFlowDefinition.getId());
            }

        } catch (Exception e) {
            log.error("流程创建失败：{}", e.getLocalizedMessage(), e);
            throw new DataFactoryException(KETTLE_ETL_CREATE_FAIL, e.getLocalizedMessage());
        }
        return dataFlowDefinition.getId().toString();
    }

    @Override
    @Transactional(rollbackFor = Exception.class)
    public String createOrUpdateFlowDefinition(FlowDefinitionModifyDTO definition) {
        DataFlowDefinition old = dataFlowDefinitionService.getById(definition.getDefinitionId());
        if (ObjectUtil.isNull(old)) {
            throw new DataFactoryException(DEFINITION_ID_NOT_FOUND, definition.getDefinitionId());
        }

        DataFlowDefinition dataFlowDefinition = new DataFlowDefinition();
        try {
            dataFlowDefinition.setUserConfig(definition.getUserConfig().getRenderConfig());
            dataFlowDefinition.setRepeatRun(definition.getRepeatRun() ? ENABLE_REPEAT_RUN : DISABLE_REPEAT_RUN);
            dataFlowDefinition.setScheduleStartTime(definition.getScheduleStartTime());
            dataFlowDefinition.setScheduleInterval(definition.getScheduleInterval());
            dataFlowDefinition.setScheduleIntervalTimeUnit(definition.getScheduleIntervalTimeUnit());
            dataFlowDefinition.setPublishStatus(PUBLISH_STATUS_SAVED);
            dataFlowDefinition.setScheduled(definition.getScheduled() ? ENABLE_SCHEDULE : DISABLE_SCHEDULE);
            dataFlowDefinition.setUpdatedAt(LocalDateTime.now());
//            dataFlowDefinition.setUpdator(StrUtil.isBlank(loginUserManager.getCurrentLoginUser().getNickname()) ? loginUserManager.getCurrentLoginUser().getLoginName() : loginUserManager.getCurrentLoginUser().getNickname());
            dataFlowDefinition.setId(definition.getDefinitionId());
            dataFlowDefinition.setLogSize(definition.getLogSize());
            dataFlowDefinition.setRetryRun(definition.getRetryRun());
            this.updateById(dataFlowDefinition);

            //更新时，删除掉定时任务
            if(definition.getDefinitionId()!=null){
                delDataJob(definition.getDefinitionId());
            }

        } catch (Exception e) {
            log.error("流程创建/更新失败：{}", e.getLocalizedMessage(), e);
            throw new DataFactoryException(KETTLE_ETL_CREATE_FAIL, e.getLocalizedMessage());
        }

        return dataFlowDefinition.getId().toString();
    }

    @Override
    @Transactional(rollbackFor = Exception.class)
    public String updateFlowDefinition(String definitionId, FlowDefinitionModifyDTO definition) {
//        if (dataFlowDefinitionService.count(Wrappers.lambdaQuery(DataFlowDefinition.class).eq(DataFlowDefinition::getName, definition.getFlowName()).ne(DataFlowDefinition::getId, definitionId)) > 0) {
//            throw new DataFactoryException(KETTLE_ETL_CREATE_FAIL, "任务流名称重复");
//        }
        DataFlowDefinition old = dataFlowDefinitionService.getById(definitionId);
        if (ObjectUtil.isNull(old)) {
            throw new DataFactoryException(DEFINITION_ID_NOT_FOUND, definitionId);
        }
        DataFlowDefinition dataFlowDefinition = new DataFlowDefinition();
//        FlowParser flowParser = new FlowParser();
        try {
            FlowConfig flowConfig = flowParser.parseFlow(definition.getUserConfig(), old.getName(), true);
            //检查动态参数
            if (definition.getRepeatRun() && checkHaveParam(flowConfig)) {
                throw new DataFactoryException(KETTLE_ETL_SCHEDULE_PROHIBIT_PARAM);
            }
            //重复运行必须设置开始时间和间隔
            if (definition.getRepeatRun()) {
                if (ObjectUtil.isNull(definition.getScheduleStartTime()) || ObjectUtil.isNull(definition.getScheduleInterval()) || StrUtil.isEmpty(definition.getScheduleIntervalTimeUnit())) {
                    throw new DataFactoryException(REPEAT_RUN_PARAM_ERROR);
                }
            }
            String flowXml = codec.decode(flowConfig, true).getXML();
            BeanUtil.copyProperties(definition, dataFlowDefinition);

            //是否为增量任务，并提取增量信息
            Map<String, Object> increNode = getIncreNode(definition);

            if (increNode != null) {
                dataFlowDefinition.setIsIncre(String.valueOf(increNode.get("isIncre")));
                dataFlowDefinition.setIncreField(String.valueOf(increNode.get("increField")));
                dataFlowDefinition.setIncreFieldType(String.valueOf(increNode.get("increFieldType")));
                dataFlowDefinition.setIncrTable(String.valueOf(increNode.get("incrTable")));
            }


            dataFlowDefinition.setId(Long.valueOf(definitionId));
            dataFlowDefinition.setName(old.getName());
            dataFlowDefinition.setUserConfig(definition.getUserConfig().getRenderConfig());
            dataFlowDefinition.setKettleDefinition(flowXml);
            dataFlowDefinition.setRepeatRun(definition.getRepeatRun() ? ENABLE_REPEAT_RUN : DISABLE_REPEAT_RUN);
            dataFlowDefinition.setScheduleStartTime(definition.getScheduleStartTime());
            dataFlowDefinition.setScheduleInterval(definition.getScheduleInterval());
            dataFlowDefinition.setScheduleIntervalTimeUnit(definition.getScheduleIntervalTimeUnit());
            dataFlowDefinition.setUpdatedAt(LocalDateTime.now());
            dataFlowDefinition.setUpdator(StrUtil.isBlank(loginUserManager.getCurrentLoginUser().getNickname()) ? loginUserManager.getCurrentLoginUser().getLoginName() : loginUserManager.getCurrentLoginUser().getNickname());
            this.updateById(dataFlowDefinition);
            //调度任务刷新
            if (ENABLE_REPEAT_RUN.equals(dataFlowDefinition.getRepeatRun()) && ENABLE_SCHEDULE.equals(old.getScheduled()) && ObjectUtil.isNotNull(dataFlowDefinition.getScheduleStartTime()) && StrUtil.isNotBlank(dataFlowDefinition.getScheduleIntervalTimeUnit()) && ObjectUtil.isNotNull(dataFlowDefinition.getScheduleInterval())) {
                refreshDataJob(dataFlowDefinition.getId());
            } else {
                if (dataFlowJobManager.hasTask(dataFlowDefinition.getId(),DATA_FLOW_JOB_GROUP)) {
                    dataFlowJobManager.deleteJob(dataFlowDefinition.getId(),DATA_FLOW_JOB_GROUP);
                }
            }
        } catch (Exception e) {
            log.error("流程定义更新失败：{}", e.getLocalizedMessage(), e);
            throw new DataFactoryException(KETTLE_ETL_CREATE_FAIL, e.getLocalizedMessage());
        }
        return dataFlowDefinition.getId().toString();
    }

    @Override
    @Transactional(rollbackFor = Exception.class)
    public void updateFlowStatus(FlowScheduleModifyDTO definition) {
        DataFlowDefinition dataFlowDefinition = dataFlowDefinitionService.getById(definition.getDefinitionId());
        if (ObjectUtil.isNull(dataFlowDefinition)) {
            throw new DataFactoryException(DATA_FLOW_ID_NOT_FOUND, definition.getDefinitionId());
        }
        //判断是否重复启停
        if (definition.getScheduled() && ENABLE_SCHEDULE.equals(dataFlowDefinition.getScheduled())) {
            throw new DataFactoryException(KETTLE_TASK_HAS_START);
        }
        if (!definition.getScheduled() && DISABLE_SCHEDULE.equals(dataFlowDefinition.getScheduled())) {
            throw new DataFactoryException(KETTLE_TASK_HAS_STOP);
        }
        //判断是否开启重复运行
        if (DISABLE_REPEAT_RUN.equals(dataFlowDefinition.getRepeatRun())) {
            throw new DataFactoryException(KETTLE_TASK_NEED_REPEAT_RUN);
        }
        dataFlowDefinition.setScheduled(definition.getScheduled() ? ENABLE_SCHEDULE : DISABLE_SCHEDULE);
        dataFlowDefinitionService.updateById(dataFlowDefinition);
        //调度任务刷新
        if (ENABLE_SCHEDULE == dataFlowDefinition.getScheduled() && ObjectUtil.isNotNull(dataFlowDefinition.getScheduleStartTime()) && StrUtil.isNotBlank(dataFlowDefinition.getScheduleIntervalTimeUnit()) && ObjectUtil.isNotNull(dataFlowDefinition.getScheduleInterval())) {
            refreshDataJob(dataFlowDefinition.getId());
        } else {
            if (dataFlowJobManager.hasTask(dataFlowDefinition.getId(),DATA_FLOW_JOB_GROUP)) {
                dataFlowJobManager.deleteJob(dataFlowDefinition.getId(),DATA_FLOW_JOB_GROUP);
            }
        }
    }

    @Override
    @Transactional(rollbackFor = Exception.class)
    public void deleteFlowDefinition(String definitionId) {
        this.removeById(definitionId);
        dataFlowJobManager.deleteJob(Long.parseLong(definitionId),DATA_FLOW_JOB_GROUP);
        dataFlowTaskService.remove(Wrappers.lambdaQuery(DataFlowTask.class).eq(DataFlowTask::getDefinitionId, definitionId));
    }

    @Override
    public FlowDefinitionDTO getFlowDefinition(String definitionId) {
        FlowDefinitionPageReqDTO dto = new FlowDefinitionPageReqDTO();
        dto.setId(definitionId);
        List<FlowDefinitionDTO> records = pageFlowDefinition(dto).getRecords();
        if (records != null && records.size() > 0) {
            FlowDefinitionDTO flowDefinitionDTO = records.get(0);
            VisGraph visGraph = new VisGraph();
            visGraph.setRenderConfig(this.getById(definitionId).getUserConfig());
            flowDefinitionDTO.setUserConfig(visGraph);
            return flowDefinitionDTO;
        } else {
            throw new DataFactoryException(DEFINITION_ID_NOT_FOUND);
        }

//        DataFlowDefinition dataFlowDefinition = this.getById(definitionId);
//        if (ObjectUtil.isNotNull(dataFlowDefinition)) {
//            FlowDefinitionDTO flowDefinitionDTO = new FlowDefinitionDTO();
//            flowDefinitionDTO.valueOf(dataFlowDefinition);
//            //找出最新的一条运行实例
//            DataFlowTask dataFlowTask = dataFlowTaskService.getOne(Wrappers.lambdaQuery(DataFlowTask.class).eq(DataFlowTask::getDefinitionId, definitionId).orderByDesc(DataFlowTask::getStartTime), false);
//            if (ObjectUtil.isNotNull(dataFlowTask)) {
//                flowDefinitionDTO.setLastRunTime(dataFlowTask.getStartTime());
//                flowDefinitionDTO.setStatus(dataFlowTask.getStatus());
//                if (ObjectUtil.isNotNull(dataFlowTask.getEndTime()) && ObjectUtil.isNotNull(dataFlowTask.getStartTime())) {
//                    flowDefinitionDTO.setLastRunDuration(Duration.between(dataFlowTask.getStartTime(), dataFlowTask.getEndTime()).getSeconds());
//                }
//            }
//            return flowDefinitionDTO;
//        } else {
//            throw new DataFactoryException(DEFINITION_ID_NOT_FOUND);
//        }
    }

    @Override
    public IPage<FlowDefinitionDTO> pageFlowDefinition(FlowDefinitionPageReqDTO pageRequest) {

        IPage<FlowDefinitionDTO> page = definitionMapper.list(pageRequest.toPage(), pageRequest);

        return page.convert(defined -> {

            //找出最新的一条运行实例
            LambdaQueryWrapper<DataFlowTask> queryWrapper = Wrappers.lambdaQuery(DataFlowTask.class).eq(DataFlowTask::getDefinitionId, defined.getDefinitionId()).orderByDesc(DataFlowTask::getStartTime).last("limit 1");
            DataFlowTask dataFlowTask = dataFlowTaskService.getOne(queryWrapper, false);
            if (ObjectUtil.isNotNull(dataFlowTask)) {
                defined.setLastRunTime(dataFlowTask.getStartTime());
                defined.setStatus(dataFlowTask.getStatus());
            }
            return defined;
        });
    }

    @Override
    public List<String> checkFlowDefinition(VisGraph userConfig) {
        try {
            TransMeta transMeta = (TransMeta) codec.decode(codec.decode(flowParser.parseFlow(userConfig, null, true), true).getXML());
            ArrayList<CheckResultInterface> remarks = new ArrayList<CheckResultInterface>();
            transMeta.checkSteps(remarks, false, null, transMeta, null, null);
            return remarks.stream().filter(r -> r.getType() != 1).map(r -> r.toString()).collect(Collectors.toList());
        } catch (Exception e) {
            log.error("kettle作业校验失败", e);
            throw new DataFactoryException(KETTLE_ETL_CHECK_FAIL);
        }
    }

    @Override
    public List<String> tableFields(GetTableFieldsDTO getTableFieldsDTO, boolean escape) {

        DataSourceDTO dataSource = dataSourceService.getDataSource(getTableFieldsDTO.getDataSourceId());
        dataSource.setDatabaseName(getTableFieldsDTO.getDatabaseName());
        String table = getTableFieldsDTO.getTableName().contains(".") ? getTableFieldsDTO.getTableName().split("\\.")[1].replaceAll("\"", "").replaceAll("`", "") : getTableFieldsDTO.getTableName().replaceAll("\"", "").replaceAll("`", "");
        dataSource.setTableName(table);
        dataSource.setSelectDatabaseName(getTableFieldsDTO.getDatabaseName());
        ResultVO tableStruct = databaseManager.getTableStruct(dataSource);

        DataSourceDTO dataSourceDTO = new DataSourceDTO();
        dataSourceDTO.setType(dataSource.getType());
        AbstractDatabase db = databaseManager.findDb(dataSourceDTO);

        List<String> fieldList = tableStruct.getContent().stream().map(ts -> String.valueOf(ts.get("columnName"))).collect(Collectors.toList());
        if (escape) {
            return fieldList.stream().map(f -> db.escapeColName(f)).collect(Collectors.toList());
        }

        return fieldList;
    }

    @Override
    public String getTableName(GetTableFieldsDTO getTableFieldsDTO) {
        DataSourceDTO dataSource = dataSourceService.getDataSource(getTableFieldsDTO.getDataSourceId());
        DataSourceDTO dataSourceDTO = new DataSourceDTO();
        dataSourceDTO.setType(dataSource.getType());
        AbstractDatabase db = databaseManager.findDb(dataSourceDTO);
        if (DataSourceTypeEnum.ORACLE.getCode() == dataSource.getType()) {
            return getTableFieldsDTO.getDatabaseName() + "." + db.escapeTbName(getTableFieldsDTO.getTableName(), getTableFieldsDTO.getDatabaseName());
        }
        if (DataSourceTypeEnum.SQLSERVER.getCode() == dataSource.getType() ||
                DataSourceTypeEnum.PGSQL.getCode() == dataSource.getType() ||
                DataSourceTypeEnum.KINGBASE.getCode() == dataSource.getType() ||
                DataSourceTypeEnum.DM.getCode() == dataSource.getType()) {
            return db.escapeDbName(getTableFieldsDTO.getDatabaseName()) + "." + getTableFieldsDTO.getTableName();
        }

        return getTableFieldsDTO.getTableName();
//        return db.escapeTbName(getTableFieldsDTO.getTableName(), getTableFieldsDTO.getDatabaseName());
    }

    @Override
    public List<String> inputOutputFields(GetInputOutputFieldsDTO dto) {
        VisGraph visGraph = UdmpBeanUtils.copy(dto, VisGraph.class);
        List<String> fieldList = new ArrayList<>();
        try {
            //设置不需要校验的步骤
            flowParser.setSkipValidateStep(Arrays.asList(dto.getStepName()));
            TransMeta transMeta = (TransMeta) codec.decode(flowParser.parseFlow(visGraph, "", false), false);
            StepMeta stepInfo = getStep(transMeta, dto.getStepName());

            RowMetaInterface fields = null;
            if (dto.getBefore()) {//获得输入字段
                if (StringUtils.isNotEmpty(dto.getFromOrToStepName())) {
                    fields = transMeta.getPrevStepFields(stepInfo, dto.getFromOrToStepName(), null);
                } else {
                    fields = transMeta.getPrevStepFields(stepInfo, null);
                }

            } else {//获得输出字段
                fields = transMeta.getStepFields(stepInfo, null);
            }
            for (int i = 0; i < fields.size(); i++) {
                ValueMetaInterface v = fields.getValueMeta(i);
                fieldList.add(v.getName());
            }
        } catch (Exception e) {
            log.error("输入或输出字段获取失败：{}", e);
            throw new DataFactoryException(KETTLE_ETL_GET_INPUT_OUTPUT_FIELDS_FAIL);
        }
        return fieldList;
    }

    public StepMeta getStep(TransMeta transMeta, String label) {
        List<StepMeta> list = transMeta.getSteps();
        for (int i = 0; i < list.size(); i++) {
            StepMeta step = list.get(i);
            if (label.equals(step.getName()))
                return step;
        }
        return null;
    }

    @Override
    public void startAllDataJobFromDatabase() {
        List<DataFlowDefinition> dataFlowDefinitions = InitiallyUtils.markInitially(() -> this.getAllDataJobInfo());
        for (DataFlowDefinition dataFlowDefinition : dataFlowDefinitions) {
            //添加数据库中已存在的定时计算任务
            if (dataFlowDefinition.getRepeatRun()==1 && ObjectUtil.isNotNull(dataFlowDefinition.getScheduleStartTime()) && StrUtil.isNotBlank(dataFlowDefinition.getScheduleIntervalTimeUnit()) && ObjectUtil.isNotNull(dataFlowDefinition.getScheduleInterval())) {
                String cronExpression = TimeUtil.getCronExpression(dataFlowDefinition.getScheduleInterval(), dataFlowDefinition.getScheduleIntervalTimeUnit(), dataFlowDefinition.getScheduleStartTime());
                dataFlowJobManager.addJob(dataFlowDefinition.getId(),cronExpression,DATA_FLOW_JOB_GROUP,DataFlowJobTask.class);
            }
        }
    }


    public List<DataFlowDefinition> getAllDataJobInfo() {
        LambdaQueryWrapper<DataFlowDefinition> qw = new LambdaQueryWrapper<>();
        qw.eq(DataFlowDefinition::getScheduled, ENABLE_SCHEDULE);
        return this.list(qw);
    }

    public void refreshDataJob(Long dataFlowId) {
        if (dataFlowJobManager.hasTask(dataFlowId,DATA_FLOW_JOB_GROUP)) {
            dataFlowJobManager.deleteJob(dataFlowId,DATA_FLOW_JOB_GROUP);
        }
        DataFlowDefinition definition = InitiallyUtils.markInitially(() -> dataFlowDefinitionService.getById(dataFlowId));
        Assert.notNull(definition,"任务定义不存在");
        String cronExpression = TimeUtil.getCronExpression(definition.getScheduleInterval(), definition.getScheduleIntervalTimeUnit(), definition.getScheduleStartTime());
        dataFlowJobManager.addJob(dataFlowId,cronExpression,DATA_FLOW_JOB_GROUP, DataFlowJobTask.class);
    }

    public void delDataJob(Long dataFlowId) {
        if (dataFlowJobManager.hasTask(dataFlowId,DATA_FLOW_JOB_GROUP)) {
            dataFlowJobManager.deleteJob(dataFlowId,DATA_FLOW_JOB_GROUP);
        }
    }

    /**
     * 检查定时调度是否有动态参数（${}）
     *
     * @return true含有 false不含有
     */
    public Boolean checkHaveParam(FlowConfig config) {
        for (VisNode visNode : config.getSteps()) {
            if (visNode instanceof TableInputNode) {
                TableInputNode inputNode = (TableInputNode) visNode;
                return ObjectUtil.isNotNull(inputNode) && (StrUtil.contains(inputNode.getSql(), "${") && StrUtil.contains(inputNode.getSql(), "}"));
            }

            if (visNode instanceof ExecSQLNode) {
                ExecSQLNode execSQLNode = (ExecSQLNode) visNode;
                return ObjectUtil.isNotNull(execSQLNode) && (StrUtil.contains(execSQLNode.getSql(), "${") && StrUtil.contains(execSQLNode.getSql(), "}"));
            }
        }
        return false;
    }

    @Override
    public String previewSql(GetTableFieldsDTO dto) {
        try {
            List<String> fieldNames = dataFlowDefinitionService.tableFields(dto, true);
            return "select " + Joiner.on(",").join(fieldNames) + " from " + dataFlowDefinitionService.getTableName(dto);
        } catch (Exception e) {
            log.error("字段获取失败：{}", e);
            throw new DataFactoryException(ErrorCode.SQL_FAST_CREATE_ERROR);

        }
    }

    /**
     * 获得单个流程的数据源
     *
     * @param definitionId
     * @return
     */
    @Override
    public List<String> getDataSourceIdFromFlow(String definitionId) {
        String xml = dataFlowDefinitionService.getById(definitionId).getKettleDefinition();
        TransMeta transMeta = (TransMeta) codec.decode(xml);
        return transMeta.getDatabases().stream().map(db -> db.getName()).collect(Collectors.toList());
    }

    /**
     * 获得所有流程的数据源
     *
     * @return
     */
    @Override
    public List<String> getDataSourceIdFromFlow() {
        Set<String> dataSourceIds = new HashSet<>();
        for (DataFlowDefinition dataFlowDefinition : dataFlowDefinitionService.list()) {
            if(StringUtils.isEmpty(dataFlowDefinition.getKettleDefinition())){
                continue;
            }
            try {
                TransMeta transMeta = (TransMeta) codec.decode(dataFlowDefinition.getKettleDefinition());
                dataSourceIds.addAll(transMeta.getDatabases().stream().map(db -> db.getName()).collect(Collectors.toSet()));
            }catch (Exception e){
                log.error(dataFlowDefinition.getName()+"解析失败跳过");
            }
        }
        return Lists.newArrayList(dataSourceIds);
    }

    @Override
    public List<String> increTables(GetIncreTablesReqDTO getIncreTablesReqDTO) {
        return SqlUtil.getSqlTableNames(getIncreTablesReqDTO.getSql());
    }

    private Map<String, Object> getIncreNode(FlowDefinitionModifyDTO definition) {
        Map<String, Integer> idMap = new HashMap();
        for (Map<String, Object> edge : definition.getUserConfig().getEdges()) {
            if (idMap.containsKey(String.valueOf(edge.get("sourceId")))) {
                idMap.put(String.valueOf(edge.get("sourceId")), idMap.get(String.valueOf(edge.get("sourceId"))) + 1);
            } else {
                idMap.put(String.valueOf(edge.get("sourceId")), 1);
            }
            if (idMap.containsKey(String.valueOf(edge.get("targetId")))) {
                idMap.put(String.valueOf(edge.get("targetId")), idMap.get(String.valueOf(edge.get("targetId"))) + 1);
            } else {
                idMap.put(String.valueOf(edge.get("targetId")), 1);
            }
        }

        for (Map<String, Object> node : definition.getUserConfig().getNodes()) {
            if (idMap.containsKey(node.get("id")) && idMap.get(node.get("id")) == 1 && node.get("nodeType").equals(KettleDataFlowNodeType.TableInput.name())) {
                return node;
            }
        }
        return null;
    }

    @Override
    public Long createFlow(FlowDTO flowDTO) {
        if (dataFlowDefinitionService.count(Wrappers.lambdaQuery(DataFlowDefinition.class).eq(DataFlowDefinition::getName, flowDTO.getFlowName()).eq(DataFlowDefinition::getGroupId, flowDTO.getGroupId())) > 0) {
            throw new DataFactoryException(KETTLE_ETL_CREATE_FAIL, "任务流名称重复");
        }
        Long id = snowflakeUtil.snowflakeId();
        DataFlowDefinition dataFlowDefinition = new DataFlowDefinition();
        dataFlowDefinition.setId(id);
        dataFlowDefinition.setGroupId(flowDTO.getGroupId());
        dataFlowDefinition.setName(flowDTO.getFlowName());
        dataFlowDefinition.setDescription(flowDTO.getDescription());
        dataFlowDefinition.setCreatedAt(LocalDateTime.now());
        dataFlowDefinition.setPublishStatus(PUBLISH_STATUS_SAVED);
        dataFlowDefinition.setScheduled(DISABLE_SCHEDULE);
        dataFlowDefinition.setCreator(StrUtil.isBlank(loginUserManager.getCurrentLoginUser().getNickname()) ? loginUserManager.getCurrentLoginUser().getLoginName() : loginUserManager.getCurrentLoginUser().getNickname());
        definitionMapper.insertSelective(dataFlowDefinition);

        return id;
    }

    @Override
    public void updateFlowBase(FlowDTO flowDTO) {
        if (dataFlowDefinitionService.count(Wrappers.lambdaQuery(DataFlowDefinition.class).eq(DataFlowDefinition::getName, flowDTO.getFlowName()).eq(DataFlowDefinition::getGroupId, flowDTO.getGroupId()).ne(DataFlowDefinition::getId,flowDTO.getDefinitionId())) > 0) {
            throw new DataFactoryException(KETTLE_ETL_CREATE_FAIL, "任务流名称重复");
        }
        DataFlowDefinition dataFlowDefinition = new DataFlowDefinition();
        dataFlowDefinition.setId(Long.valueOf(flowDTO.getDefinitionId()));
        dataFlowDefinition.setGroupId(flowDTO.getGroupId());
        dataFlowDefinition.setName(flowDTO.getFlowName());
        dataFlowDefinition.setDescription(flowDTO.getDescription());
        dataFlowDefinition.setUpdatedAt(LocalDateTime.now());
        dataFlowDefinition.setUpdator(StrUtil.isBlank(loginUserManager.getCurrentLoginUser().getNickname()) ? loginUserManager.getCurrentLoginUser().getLoginName() : loginUserManager.getCurrentLoginUser().getNickname());
        definitionMapper.updateById(dataFlowDefinition);
    }

    @Override
    public void copy(FlowDTO flowDTO) {
        if (dataFlowDefinitionService.count(Wrappers.lambdaQuery(DataFlowDefinition.class).eq(DataFlowDefinition::getName, flowDTO.getFlowName()).eq(DataFlowDefinition::getGroupId, flowDTO.getGroupId())) > 0) {
            throw new DataFactoryException(KETTLE_ETL_CREATE_FAIL, "任务流名称重复");
        }

        DataFlowDefinition dataFlowDefinition = this.getById(flowDTO.getDefinitionId());
        dataFlowDefinition.setId(snowflakeUtil.snowflakeId());
        dataFlowDefinition.setGroupId(flowDTO.getGroupId());
        dataFlowDefinition.setName(flowDTO.getFlowName());
        dataFlowDefinition.setDescription(flowDTO.getDescription());
        dataFlowDefinition.setCreatedAt(LocalDateTime.now());
        dataFlowDefinition.setPublishStatus(PUBLISH_STATUS_SAVED);
        dataFlowDefinition.setScheduled(DISABLE_SCHEDULE);
        dataFlowDefinition.setUpdator(null);
        dataFlowDefinition.setUpdatedAt(null);
        dataFlowDefinition.setCreator(StrUtil.isBlank(loginUserManager.getCurrentLoginUser().getNickname()) ? loginUserManager.getCurrentLoginUser().getLoginName() : loginUserManager.getCurrentLoginUser().getNickname());
        definitionMapper.insertSelective(dataFlowDefinition);
    }
}
